package org.example.launch

import com.alibaba.fastjson.serializer.SerializerFeature
import com.alibaba.fastjson.{JSON, JSONObject}
import org.example.common.{Logging, Sparking}
import org.example.constant.ApolloConst
import org.example.dao.{MysqlConfig, RealLocation, TrafficDayIndex}
import org.example.utils.{CommonUtils, RedisSink, ZkManager}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession, functions}
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, TaskContext}
import org.elasticsearch.spark.rdd.EsSpark
import org.joda.time.DateTime
import org.joda.time.format.DateTimeFormat

import scala.collection.mutable.{ArrayBuffer, Map}

/**
 * 安全行驶里程天维度实时统计
 *
 */
object TrafficDayToES extends Sparking with Logging {
  val mysqlConfig = MysqlConfig("jdbc", ApolloConst.jgdMysqlURL, "", ApolloConst.jgdMysqlUserName, ApolloConst.jgdMysqlPassWord, "")

  // 接入车辆信息
  val TEMP_BASE_INTO_VEHICLE_INFO_SQL =
    """
      |(select
      |enterprise.enterprise_name as enterprisename,
      |enterprise.enterprise_code as enterprisecode,
      |enterprise.address,
      |enterprise.province,
      |enterprise.city,
      |enterprise.area,
      |vehicle.business_scope as businessscope,
      |vehicle.business_scope_detail as businessscopedetail,
      |vehicle.class_line as classline,
      |vehicle.driving_permit_no as drivingpermitno,
      |vehicle.enterprise_code as vehicle_enterprise_code,
      |vehicle.indentifier,
      |vehicle.into_status,
      |vehicle.into_time,
      |vehicle.operating_certificate_no as operatingcertificateno,
      |vehicle.use_nature as usenature,
      |vehicle.vehicle_brand as vehiclebrand,
      |vehicle.vehicle_code,
      |vehicle.plate_color as vehicleColor,
      |vehicle.plate_num as vehicleNo
      |from
      |zcov.basic_vehicle_info as vehicle
      |left join
      |zcov.basic_enterprise_info as enterprise
      |on
      |vehicle.enterprise_code=enterprise.enterprise_code) t1
      |""".stripMargin

  // 平台基本信息
  val TEMP_BASE_INTO_PLATFORM_INFO_SQL =
  """
      |(select
      | platform_code as platformCode,
      | platform_name as platformName
      |from
      |zcov.basic_into_platform_info) t2
      |""".stripMargin
  //定位信息 join 平台信息,企业信息
  val TEMP_REAL_LOCATION_PLATE_SQL =
    """
      |select locationinfo.appId,
      |       plat.platformName,  --平台名称
      |       locationinfo.msgId,
      |       locationinfo.dataType,
      |       locationinfo.vehicleNo,
      |       locationinfo.vehicleColor,
      |       locationinfo.alarm,
      |       locationinfo.altitude,
      |       locationinfo.dateTime,
      |       locationinfo.direction,
      |       locationinfo.state,
      |       locationinfo.vec1,
      |       locationinfo.vec2,
      |       locationinfo.vec3,
      |       locationinfo.businessTime,
      |       locationinfo.lon,
      |       locationinfo.lat,
      |       --locationinfo.districtCode,
      |       locationinfo.drivertime,
      |       locationinfo.driverange,
      |       locationinfo.mileg,
      |       null as ewaybillinfo,
      |       null as driverName,
      |       null as driverId,
      |       null as licence,
      |       null as orgname,
      |       null as recognitiontime,
      |       vehiclenterprise.enterprisecode as enterpriseCode,
      |       vehiclenterprise.enterprisename as enterpriseName,
      |       vehiclenterprise.address,
      |       vehiclenterprise.province,
      |       vehiclenterprise.city,
      |       vehiclenterprise.area,
      |       vehiclenterprise.businessscope,
      |       vehiclenterprise.businessscopedetail,
      |       vehiclenterprise.indentifier,
      |       vehiclenterprise.operatingcertificateno,
      |       vehiclenterprise.classline,
      |       vehiclenterprise.usenature,
      |       vehiclenterprise.drivingpermitno,
      |       vehiclenterprise.vehiclebrand,
      |       locationinfo.dayTime
      |from TEMP_REAL_LOCATION as locationinfo
      |left join TEMP_PLATFORM_INFO as plat
      |     on locationinfo.appId = plat.platformCode
      |left join TEMP_VEHICLE_INFO as vehiclenterprise
      |     on locationinfo.vehicleNo = vehiclenterprise.vehicleNo
      |where locationinfo.vec1 != ''
    """.stripMargin

  def main(args: Array[String]): Unit = {
    val sparkSession: SparkSession = CommonUtils.getSparkSession()
    val ssc = new StreamingContext(sparkSession.sparkContext, Seconds(5))
    val broadRedisSink = ssc.sparkContext.broadcast(RedisSink())

    // 创建临时表
    createTempTable(sparkSession,mysqlConfig)
    val zkManager = ZkManager(ApolloConst.zkKafka)
    val kafkaParams = getKafkaParams(ApolloConst.bootstrap, "traffic3.2")
    val offsets = zkManager.getBeginOffset(ApolloConst.GPS_TOPICS, "traffic3.2")
    val offsetRanges = new ArrayBuffer[OffsetRange]()
    val inputStream = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](ApolloConst.GPS_TOPICS, kafkaParams, offsets))
    inputStream.transform { rdd =>
      offsetRanges.clear()
      offsetRanges.append(rdd.asInstanceOf[HasOffsetRanges].offsetRanges: _*)
      rdd
    }.map(x => x.value()).foreachRDD { rdd =>
      if (!rdd.isEmpty()) {
        //数据分类，只取定位数据
        val rddClass = rdd.map { row =>
          val off: OffsetRange = offsetRanges(TaskContext.get.partitionId)
          if (off.topic.equals("UP_EXG_MSG_REAL_LOCATION")) {
            (0, row)
          } else if (off.topic.equals("UP_EXG_MSG_HISTORY_LOCATION")) {
            (1, row)
          } else {
            (2, row)
          }
        }
        val startTime = DateTime.now()
        try {
          //val rdd1 = rdd.map(x => JSON.parseObject(CommonUtils.rmAscII(x)))
          val rdd1 = rddClass.filter(line => line._1 == 0 || line._1 == 1)
            .coalesce(20)
            .map{x =>
              var data: JSONObject = null
              try {
                data = JSON.parseObject(x._2)
              } catch {
                case e: Exception => {
                  error(s"该条数据格式错误:${x}")
                }
              }
              data}
            .filter(x => null !=x && "0x1200".equals(x.getString("msgId")) && ("0x1203".equals(x.getString("dataType")) || "0x1202".equals(x.getString("dataType"))))

          val gpsRdd: RDD[JSONObject] = rdd1.flatMap { x =>
            val result = if ("0x1200".equals(x.getString("msgId")) && "0x1203".equals(x.getString("dataType"))) {
              //定位补报数据
              CommonUtils.addLocation(x)
            } else {
              Array(x)
            }
            result
          }.map {
            json =>
              var dateTime = ""
              try {
                dateTime = DateTime.parse(json.getString("dateTime"), DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")).toString("yyyy-MM-dd HH:mm:ss")
              } catch {
                case _: Exception => {
                  info("error datetime format" + json.getString("dateTime"))
                  dateTime = DateTime.parse(json.getString("businessTime"), DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")).toString("yyyy-MM-dd HH:mm:ss")
                }
              }
              json.put("dateTime", dateTime)
              json
          }.groupBy(x => (x.getString("vehicleColor"), x.getString("vehicleNo"), x.getString("dateTime")))
            .map(x => x._2.maxBy(y => y.getString("businessTime")))
            .filter {
              x =>
                var result = true
                try {
                  if (9 != x.getString("lon").length
                    || 8 != x.getString("lat").length
                    || 19 != x.getString("businessTime").length
                    || 19 != x.getString("dateTime").length) {
                    result = false
                  }
                  val dateTime = DateTime.parse(x.getString("dateTime"), DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss"))
                  val businessTime = DateTime.parse(x.getString("businessTime"), DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss"))
                  if (businessTime.getMillis - dateTime.plusDays(1).getMillis > 0) {
                    result = false
                  }
                } catch {
                  case _: Exception => {
                    result = false
                  }
                }
                result
            }

          writeEsNew(gpsRdd, broadRedisSink, sparkSession)

          zkManager.saveEndOffset(offsetRanges, "traffic3.2")
        }catch {
          case e: Exception => {
            e.printStackTrace()
            rdd.map(str => info(str))
          }
        }
      }
    }
    ssc.start()
    ssc.awaitTermination()
  }
  /**
   * 将统计后的结果放入es中
   */
  def writeEsNew(gpsRdd: RDD[JSONObject], broadRedisSink: Broadcast[RedisSink], sparkSession: SparkSession) = {

    val odsData: RDD[JSONObject] = gpsRdd.sortBy(json => json.getString("dateTime"))
      .map { x =>
        broadRedisSink.value.usingRedis { redis =>
          redis.select(9)
          val dataTime = x.getString("dateTime")
          val dayTime = dataTime.split(" ")(0)
          val key = "vehicleGps_" + dayTime
          val filed = x.getString("vehicleNo") + "_" + x.getString("vehicleColor")
          val last = redis.hget(key, filed)
          var times = 0L
          var mileages = 0D
          var mileg = 0D
          if (null != last) {
            val lastJson = JSON.parseObject(last)
            times = CommonUtils.getBetweenSecond(lastJson.getString("dateTime"), x.getString("dateTime"))
            val lastLon = lastJson.getString("lon")
            val lastLat = lastJson.getString("lat")
            val lon = java.lang.Double.parseDouble(lastLon) * 0.000001
            val lat = java.lang.Double.parseDouble(lastLat) * 0.000001
            mileages = CommonUtils.getDistance(java.lang.Double.parseDouble(x.getString("lat")) * 0.000001,
              java.lang.Double.parseDouble(x.getString("lon")) * 0.000001,
              lat, lon)
            //该时间段内最大里程
            mileg = 120 * (times * 1.0 / 3600)
          }
          redis.hset(key, filed, JSON.toJSONString(x, SerializerFeature.QuoteFieldNames))
          redis.expire(key, 86400)
          x.put("vec1",x.getDouble("vec1").toString)
          x.put("vec2",x.getDouble("vec2").toString)
          x.put("dayTime", dayTime)
          x.put("drivertime", times)
          x.put("driverange", mileages)
          x.put("mileg", mileg)
          x
        }
      }

    val driveranges: RDD[RealLocation] =
      odsData.filter(x => x.getDouble("driverange") > 0D && x.getLong("drivertime") < 600L && x.getLong("drivertime") > 0 && x.getDouble("driverange") <= x.getDouble("mileg"))
        .map{x=>
          var location:RealLocation = null
          try {
            location = JSON.toJavaObject(x, classOf[RealLocation])
          } catch {
            case e: Exception => {
              error(s"该条数据格式:$x")
            }
          }
          location
        }

    if(!driveranges.isEmpty){
      import sparkSession.implicits._
      val realFrame: Dataset[RealLocation] = driveranges.toDS().as[RealLocation]
      realFrame.createOrReplaceTempView("TEMP_REAL_LOCATION")
      val frame = sparkSession.sql(TEMP_REAL_LOCATION_PLATE_SQL)
      //按照平台，企业，车，人统计行驶时间和行驶里程
      val car_driver_mileageDF = frame.groupBy("appId", "platformName", "enterpriseName", "enterpriseCode", "vehicleNo", "vehicleColor", "area", "province", "city", "usenature", "driverId", "driverName", "dayTime")
        .sum("drivertime", "driverange")
        .toDF("appId", "platformName", "enterpriseName", "enterpriseCode", "vehicleNo", "vehicleColor", "area", "province", "city", "usenature", "driverId", "driverName", "dayTime", "driverTime", "driverange")
        .withColumn("id", functions.concat_ws("_", $"enterpriseCode",$"vehicleNo",$"vehicleColor",$"dayTime",$"driverId"))
        .select("id", "appId", "platformName", "enterpriseName", "enterpriseCode", "vehicleNo", "vehicleColor", "area", "province", "city", "usenature", "driverId", "driverName", "driverTime", "driverange", "dayTime")

      val trafficDayIndexRdd = car_driver_mileageDF.rdd.mapPartitions { rows =>
        val indexes1: Iterator[TrafficDayIndex] = RedisSink().usingRedis{ redis =>
          redis.select(9)
          val indexes: Iterator[TrafficDayIndex] = rows.map { x =>
            var drivertimeTotal = x.getAs[Long]("driverTime")
            var driverangeTotal = x.getAs[Double]("driverange")
            val dayTime = x.getAs[String]("dayTime")
            val key = "vehicleDriverange_" + dayTime
            val last_time_range = redis.hget(key, x.getAs[String]("id"))
            if (null != last_time_range) {
              val time_and_range = last_time_range.split("\\|")
              //总行驶时间
              val lastDrivertime = time_and_range(0).toLong
              //总行驶距离
              val lastDriverrange = time_and_range(1).toDouble
              drivertimeTotal = drivertimeTotal + lastDrivertime
              driverangeTotal = driverangeTotal + lastDriverrange
            }
            redis.hset(key, x.getAs[String]("id"), drivertimeTotal + "|" + driverangeTotal)
            redis.expire(key, 86400)

            TrafficDayIndex(x.getAs[String]("id"), x.getAs[String]("appId"),
              x.getAs[String]("enterpriseName"), x.getAs[String]("enterpriseCode"),
              x.getAs[String]("vehicleNo"), x.getAs[String]("vehicleColor"),
              x.getAs[String]("dayTime"), x.getAs[String]("driverName"),
              x.getAs[String]("driverId"), x.getAs[Long]("driverTime").toString,
              driverangeTotal, x.getAs[String]("usenature"),
              x.getAs[String]("province"), x.getAs[String]("city"),
              x.getAs[String]("area"))
          }
          indexes
        }
        indexes1
      }

      EsSpark.saveToEs(trafficDayIndexRdd, "traffic_day_index/traffic_day_type", Map("es.mapping.id" -> "id"))
    }
  }

  /**
   *创建临时表
   */
  def createTempTable(sparkSession: SparkSession,mysqlConfig: MysqlConfig): Unit ={
     selectFromMysql(sparkSession,mysqlConfig,TEMP_BASE_INTO_VEHICLE_INFO_SQL).createOrReplaceTempView("TEMP_VEHICLE_INFO")
     selectFromMysql(sparkSession,mysqlConfig,TEMP_BASE_INTO_PLATFORM_INFO_SQL).createOrReplaceTempView("TEMP_PLATFORM_INFO")
  }
  /**
   * 从mysql查询数据
   *
   * @param sparkSession SparkSession
   * @param mysqlConfig  Mysql配置
   * @param sql          执行的sql
   */
  def selectFromMysql(sparkSession: SparkSession, mysqlConfig: MysqlConfig, sql: String): DataFrame = {
    sparkSession.read
      .format(mysqlConfig.format)
      .option("url", mysqlConfig.url)
      .option("user", mysqlConfig.user)
      .option("password", mysqlConfig.password)
      .option("driver", "com.mysql.jdbc.Driver")
      .option("dbtable", sql)
      .load()
  }

}

/**
 * 实例化SparkSession的单例实例
 *
 * @author ljh
 *
 */
object SparkSessionSingleton {

  @transient private var instance: SparkSession = _

  def getInstance(sparkConf: SparkConf): SparkSession = {
    if (instance == null) {
      this.synchronized{
        if (instance == null) {
          instance = SparkSession
            .builder
            .enableHiveSupport()
            .config(sparkConf)
            .config("hive.exec.dynamic.partition", true) // 支持 Hive 动态分区
            .config("hive.exec.dynamic.partition.mode", "nonstrict") // 非严格模式
            .config("hive.metastore.uris", ApolloConst.hiveMetastore)
            .config("spark.sql.sources.partitionOverwriteMode", "dynamic")
            .config("spark.sql.crossJoin.enabled", "true")
            //.master("local[*]")
            .getOrCreate()
        }
      }
    }
    instance
  }
}

